Bike sharing systems are a means of renting bicycles where the process of obtaining membership, rental, and bike return is automated via a network of kiosk locations throughout a city. Using these systems, people are able rent a bike from a one location and return it to a different place on an as-needed basis. Currently, there are over 500 bike-sharing programs around the world.
The data generated by these systems makes them attractive for researchers because the duration of travel, departure location, arrival location, and time elapsed is explicitly recorded. Bike sharing systems therefore function as a sensor network, which can be used for studying mobility in a city. In this competition, participants are asked to combine historical usage patterns with weather data in order to forecast bike rental demand in the Capital Bikeshare program in Washington, D.C.
Import Apache Spark 2.1libraries and setup environment variables. In case for cluster deployment you should probably remove this cell.
import os
import sys
import time
# Change to path where apache spark 2.x is downloaded
SPARK_PATH = '/users/suchy/Documents/spark-2.1.0-bin-hadoop2.7'
os.environ['SPARK_HOME'] = SPARK_PATH
SPARK_HOME = os.environ['SPARK_HOME']
#Add the following paths to the system path.
sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","pyspark.zip"))
sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib","py4j-0.10.4-src.zip"))
Initialize Spark Context for driver program
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
conf = (SparkConf()
.setAppName("Capital_Bike_weather_mining")
.set("spark.executor.memory", "4g")
.set("spark.driver.memory", "8g"))
#.setMaster("yarn") - for cluster deployment
sc = SparkContext.getOrCreate(conf = conf)
sqlContext = SQLContext(sc)
sc = sc.setCheckpointDir("checkpoint")
Import pandas, numpy, matplotlib,and seaborn. Then set %matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)
We are provided hourly rental data spanning two years. For this competition, the training set is comprised of the first 19 days of each month, while the test set is the 20th to the end of the month. We must predict the total count of bikes rented during each hour covered by the test set, using only information available prior to the rental period.
</a>Read in the Customers Bike Rental csv train and test files as a Pandas DataFrame.
train_df = pd.read_csv("../data/train.csv")
test_df = pd.read_csv("../data/test.csv")
There are 3 dependent variables: casual, registered and count (casual + registered) .
We will try both methods: predict amount of casual and regitered rentals separately and predict sum of those (count)
train_df.head()
test_df.head()
Check out customers rental info() and describe() methods. Thera are total 10866 entries for traing data and 6493 entries for test data, none of the column has missing values
train_df.info()
test_df.info()
continuous_var = ['temp', 'atemp', 'humidity', 'windspeed']
# exclude binary variables
categorical_var=['season','weather','year','month','hour','dayofweek','weekofyear','quarter','hour_cat', 'temp_cat']
dependent_variables = ['casual', 'registered', 'count']
added_cat_var = ['season_cat', 'month_cat', 'dayofweek_cat', 'weather_cat']
seasonDict = {1: "Winter", 2 : "Spring", 3 : "Summer", 4 :"Fall" }
monthDict = {1: "January", 2 : "February", 3 : "March", 4 :"April" ,\
5 : "May", 6 : "June", 7 : "July", 8 :"August" ,\
9: "September", 10 : "October", 11 : "November", 12 :"December"}
dayofweekDict = {0: "Monday", 1 : "Tuesday", 2 : "Wednesday", 3 :"Thursday" ,\
4 : "Friday", 5 : "Saturday", 6 : "Sunday"}
weatherDict = {1: "Clear", 2 : "Mist", 3 : "Light_Snow", 4 :"Heavy_Rain" }
Data preparation part: parse data timestamp, add dummy and derivated variables. For data including categorical variables with different number of levels, random forests are biased in favor of those attributes with more levels. So it's good idea to have categorical variables with similar level of unique values.
Lot's of data from the function below are taken in next section of Exploratory Data Analysis, like rental peak hours.
def set_derivated_vars(dataset):
customers_rental = dataset.copy()
# create derivate variable: day-to-day and hour change for continuous variables
for var in continuous_var:
customers_rental[var+'_prev_hour_change_pct'] = customers_rental[var].pct_change(periods=1) * 100
# to make positive values means percentage increase of variable in next hour
customers_rental[var+'_next_hour_change_pct'] = customers_rental[var].pct_change(periods=-1)*(-1) * 100
customers_rental[var+'_prev_day_change_pct'] = customers_rental[var].pct_change(periods=24) * 100
customers_rental[var+'_next_day_change_pct'] = customers_rental[var].pct_change(periods=-24) *(-1) * 100
customers_rental['atemp_temp_diff'] = customers_rental['atemp'] - customers_rental['temp']
customers_rental = customers_rental.replace([np.inf, -np.inf], np.nan)
# Replace nan by interpolating, first argument can't be NaN for interpolation to works,
# In Pandas interpolate implementation NaN variable above the range
# will we interpolated as well (like extrapolation)
customers_rental.iloc[0] = customers_rental.iloc[0].fillna(customers_rental.mean())
customers_rental = customers_rental.interpolate(method='time')
return customers_rental
def prepare_data(customers_rental, get_dummy=False, test_data=False, Kaggle = False):
customers_rental['date'] = pd.to_datetime(customers_rental['datetime'])
customers_rental["year"] = customers_rental["date"].dt.year
customers_rental["month"] = customers_rental["date"].dt.month
customers_rental["day"] = customers_rental["date"].dt.day
customers_rental["hour"] = customers_rental["date"].dt.hour
customers_rental["dayofweek"] = customers_rental["date"].dt.dayofweek
customers_rental["weekofyear"] = customers_rental["date"].dt.weekofyear
customers_rental["season_cat"] = customers_rental["season"].map(seasonDict)
customers_rental["month_cat"] = customers_rental["month"].map(monthDict)
customers_rental["dayofweek_cat"] = customers_rental["dayofweek"].map(dayofweekDict)
customers_rental["weather_cat"] = customers_rental["weather"].map(weatherDict)
# next_hour_weather is category for next hour weather
next_hour_weather = customers_rental['weather'].copy()
next_hour_weather[:len(next_hour_weather)-1] = next_hour_weather[1:len(next_hour_weather)]
customers_rental['next_hour_weather'] = next_hour_weather.reset_index()['weather']
customers_rental['quarter'] = 0
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] > 0), 'quarter'] = 1
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] > 3), 'quarter'] = 2
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] > 6), 'quarter'] = 3
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] > 9), 'quarter'] = 4
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] > 0), 'quarter'] = 5
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] > 3), 'quarter'] = 6
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] > 6), 'quarter'] = 7
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] > 9), 'quarter'] = 8
# create additional variable about rental traffic by hour,
# specific hours are taken from Exploratoty Data Analysis from clustermap by the end of next section
customers_rental['hour_cat'] = 0
customers_rental.loc[(customers_rental['hour'] <= 6) | (customers_rental['hour'] == 23), 'hour_cat'] = 1
customers_rental.loc[(customers_rental['hour'] >= 7) & (customers_rental['hour'] <= 8) & \
(customers_rental['workingday'] == 0), 'hour_cat'] = 2
customers_rental.loc[(customers_rental['hour'] >= 7) & (customers_rental['hour'] <= 8) & \
(customers_rental['workingday'] == 1), 'hour_cat'] = 3
customers_rental.loc[((customers_rental['hour'] >= 20) & (customers_rental['hour'] <= 22)) | \
(customers_rental['hour'] == 9), 'hour_cat'] = 4
customers_rental.loc[(customers_rental['hour'] >= 10) & (customers_rental['hour'] <= 16) & \
(customers_rental['workingday'] == 0), 'hour_cat'] = 5
customers_rental.loc[(customers_rental['hour'] >= 10) & (customers_rental['hour'] <= 16) & \
(customers_rental['workingday'] == 1), 'hour_cat'] = 6
customers_rental.loc[(customers_rental['hour'] >= 17) & (customers_rental['hour'] <= 18) & \
(customers_rental['workingday'] == 0), 'hour_cat'] = 7
customers_rental.loc[(customers_rental['hour'] >= 17) & (customers_rental['hour'] <= 18) & \
(customers_rental['workingday'] == 1), 'hour_cat'] = 8
# create additional variable about rental traffic by temp,
# specific hours are taken from Exploratoty Data Analysis from clustermap by the end of next section
customers_rental['temp_cat'] = 0
customers_rental.loc[(customers_rental['temp'].round() <= 9), 'temp_cat'] = 1
customers_rental.loc[(customers_rental['temp'].round() >= 10) & (customers_rental['temp'].round() <= 12) & \
(customers_rental['workingday'] == 0), 'temp_cat'] = 2
customers_rental.loc[(customers_rental['temp'].round() >= 10) & (customers_rental['temp'].round() <= 12) & \
(customers_rental['workingday'] == 1), 'temp_cat'] = 3
customers_rental.loc[(((customers_rental['temp'].round() >= 13) & (customers_rental['temp'].round() <= 15)) |\
((customers_rental['temp'].round() >= 18) & (customers_rental['temp'].round() <= 19))) & \
(customers_rental['workingday'] == 0), 'temp_cat'] = 4
customers_rental.loc[(((customers_rental['temp'].round() >= 13) & (customers_rental['temp'].round() <= 15)) |\
((customers_rental['temp'].round() >= 18) & (customers_rental['temp'].round() <= 19))) & \
(customers_rental['workingday'] == 1), 'temp_cat'] = 5
customers_rental.loc[(((customers_rental['temp'].round() >= 16) & (customers_rental['temp'].round() <= 17)) |\
(customers_rental['temp'].round() == 22)), 'temp_cat'] = 6
customers_rental.loc[(((customers_rental['temp'].round() >= 20) & (customers_rental['temp'].round() <= 21)) |\
((customers_rental['temp'].round() >= 23) & (customers_rental['temp'].round() <= 29))) & \
(customers_rental['workingday'] == 0), 'temp_cat'] = 7
customers_rental.loc[(((customers_rental['temp'].round() >= 20) & (customers_rental['temp'].round() <= 21)) |\
((customers_rental['temp'].round() >= 23) & (customers_rental['temp'].round() <= 29))) & \
(customers_rental['workingday'] == 1), 'temp_cat'] = 8
customers_rental.loc[(customers_rental['temp'].round() >= 30) & (customers_rental['temp'].round() <= 38), \
'temp_cat'] = 9
customers_rental.loc[(customers_rental['temp'].round() >= 39), 'temp_cat'] = 10
# from EDA
customers_rental['bike_season'] = 0
customers_rental.loc[(customers_rental['month'] >= 4) & (customers_rental['month'] <= 11), 'bike_season'] = 1
# data taken from independent source weather
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] == 4) \
& (customers_rental['day'] == 15), "workingday"] = 1
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] == 4) \
& (customers_rental['day'] == 16), "workingday"] = 1
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] == 4) \
& (customers_rental['day'] == 15), "holiday"] = 1
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] == 4) \
& (customers_rental['day'] == 16), "holiday"] = 1
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] == 11) \
& (customers_rental['day'] == 25), "holiday"] = 1
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] == 11) \
& (customers_rental['day'] == 23), "holiday"] = 1
customers_rental.loc[(customers_rental['year'] == 2011) & (customers_rental['month'] == 11) \
& (customers_rental['day'] == 25), "workingday"] = 0
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] == 11) \
& (customers_rental['day'] == 23), "workingday"] = 0
customers_rental.loc[(customers_rental['year'] == 2012) & (customers_rental['month'] == 10) \
& (customers_rental['day'] == 30), "holiday"] = 1
customers_rental.loc[(customers_rental['month'] == 12) \
& ((customers_rental['day'] == 24) | \
(customers_rental['day'] == 26)), "workingday"] = 0
customers_rental.loc[(customers_rental['month'] == 12) \
& ((customers_rental['day'] == 24) | \
(customers_rental['day'] == 25) | \
(customers_rental['day'] == 26)), "holiday"] = 1
#storms
customers_rental.loc[(customers_rental['month'] == 2012) & (customers_rental['month'] == 5) \
& (customers_rental['day'] == 21), "holiday"] = 1
#tornado
customers_rental.loc[(customers_rental['month'] == 2012) & (customers_rental['month'] == 6) \
& (customers_rental['day'] == 1), "holiday"] = 1
# create dummy variables (if needed)
if (get_dummy):
customers_rental = pd.get_dummies(data=customers_rental, columns=categorical_var, drop_first=True)
dt = pd.DatetimeIndex(customers_rental['datetime'])
customers_rental.set_index(dt, inplace=True)
# first day don't have pct_change, as well as there are some divide by zero operation (inf)
# create derivated variables, used among others in outliers detecion
customers_rental = set_derivated_vars(customers_rental)
customers_rental['train'] = 0
customers_rental.loc[(customers_rental['day'] < 20), 'train'] = 1
# Apache Spark solver uses square loss function for minimalization.
# Our evalutation metric will be RMSLE so it's good idea to use logarithmic transformation
# of dependent cols (adding 1 first so that 0 values don't become -inf)
for col in dependent_variables:
customers_rental['%s_log' % col] = np.log(customers_rental[col] + 1)
# drop unnecessary variables
customers_rental.drop(['datetime','date'], axis=1, inplace=True)
return customers_rental
We will use dataset with dummy variables for linear regression We will use dataset without dummy varaibles for random forest regression and gradient boosted regression.
all_dummy = prepare_data(train_df.copy().append(test_df.copy()), get_dummy=True)
all_no_dummy = prepare_data(train_df.copy().append(test_df.copy()), get_dummy=False)
Columns structure (without dummies) after data preparation part. Later we will explore which set of those columns use for machine learning models.
all_no_dummy.info()
Let's explore the data!
sns.set_palette("coolwarm")
sns.set_style('whitegrid')
Median of continuous variables looks reasonably. There are many outliers in windspeed variable and some outliers for zero humidity.
fig,(ax1,ax2)= plt.subplots(ncols=2, figsize=(12,4))
sns.boxplot(data=all_no_dummy[continuous_var], ax=ax1)
sns.boxplot(data=all_no_dummy[all_no_dummy['train'] == 1][dependent_variables], ax=ax2)
Explore data of daily average of one hour bike rentals across all years 2011 and 2012. Sliding windows is of interval 14 days.
outliers = {}
def plot_with_window(column, dataset=all_no_dummy, all=True, threshold=75):
if (all):
day_avg = 3
data = dataset
data_window_mean = data.rolling(window=day_avg * 24).mean()
data_window_std = data.rolling(window=day_avg * 24).std()
else:
day_avg = 7
data = dataset[all_no_dummy['train'] == 1].groupby(['year','month','day']).mean()
data_window_mean = data.rolling(window=day_avg).mean()
data_window_std = data.rolling(window=day_avg).std()
diff = np.abs(data[column] - data_window_mean[column])
# diff for outliers candidates has to be at least above threshold % to avoid false outliers
outliers = diff[(diff > 3 * data_window_std[column]) & (diff > threshold)] #.index.values.tolist()
plt.figure(figsize=(9,4))
data[column].plot()
data_window_mean[column].plot(label=str(day_avg) + ' Day Avg', lw=1, ls='--', c='red')
plt.legend(loc='best')
plt.ylabel(column)
plt.title('Daily mean of ' + column + ' over time')
plt.tight_layout()
return outliers
def unique_outliers(col_list):
outliers_list = []
for col in col_list:
for timestamp in outliers[col].index:
outliers_list.append(timestamp)
return sorted(list(set(outliers_list)))
Characterics of casual renting is very changeable with many positive peaks.
for col in dependent_variables:
outliers[col] = plot_with_window(col, all=False)
Plot for registred users looks almost the same becouse on average about 80% of total rentals are made be registered users.
all_no_dummy[all_no_dummy['train'] == 1][['casual', 'registered', 'count']].describe().ix['mean']
Thera are no outliers for dependent variables (by our criteria).
all_no_dummy.ix[unique_outliers(dependent_variables)]
Let's find out whether there are some weather data outliers.
temp_var_list = ['temp',
'temp_prev_hour_change_pct',
'temp_next_hour_change_pct',
'temp_prev_day_change_pct',
'temp_next_day_change_pct']
for col in temp_var_list:
outliers[col] = plot_with_window(col)
Some of the outliers below.
all_no_dummy.ix[unique_outliers(temp_var_list), temp_var_list].head()
atemp_var_list = ['atemp',
'atemp_prev_hour_change_pct',
'atemp_next_hour_change_pct',
'atemp_prev_day_change_pct',
'atemp_next_day_change_pct']
for col in atemp_var_list:
outliers[col] = plot_with_window(col)
all_no_dummy.ix[unique_outliers(atemp_var_list)][atemp_var_list].head()
outliers['atemp_temp_diff'] = plot_with_window('atemp_temp_diff', threshold=10)
all_no_dummy.ix[unique_outliers(['atemp_temp_diff'])]['atemp_temp_diff'].head(400)
We can see one clear outlier in humidity. During minium day for count variable (2011, 3, 6), humidity had one of its highest value. It matches our understanding - there are fewer bike rentals during rain. On (2011, 3, 10) humidity has zero daily mean value although in adjacent days there was high value for humidity. It looks like some missing values.
all_no_dummy.groupby(['year','month','day']).mean().idxmin()
humidity_var_list = ['humidity',
'humidity_prev_hour_change_pct',
'humidity_next_hour_change_pct',
'humidity_prev_day_change_pct',
'humidity_next_day_change_pct']
for col in humidity_var_list:
outliers[col] = plot_with_window(col)
all_no_dummy.ix[unique_outliers(humidity_var_list)][humidity_var_list].head()
Windspeed becouse of its nature has very inregular characteriscs but in a way it has some stable mean level. Looks like this variable doesn't add value for our predictive models.
windspeed_var_list = ['windspeed',
'windspeed_prev_hour_change_pct',
'windspeed_next_hour_change_pct',
'windspeed_prev_day_change_pct',
'windspeed_next_day_change_pct']
for col in windspeed_var_list:
outliers[col] = plot_with_window(col)
all_no_dummy.ix[unique_outliers(windspeed_var_list)][windspeed_var_list].head()
len(unique_outliers(humidity_var_list + temp_var_list + atemp_var_list + windspeed_var_list + ['atemp_temp_diff']))
def interpolate_data(dataset):
data = dataset.copy()
for col_list in [humidity_var_list] + [temp_var_list] + [atemp_var_list] + [windspeed_var_list]:
for col in col_list:
for timestamp in outliers[col].index:
data.loc[timestamp, col_list] = np.NaN
# avoid first row NaN
data.iloc[0] = data.iloc[0].fillna(data.mean())
data = data.interpolate(method='time', axis=0)
# after interpolating continuous varialbes we must recompute derivated variables
data = set_derivated_vars(data)
return data
all_no_dummy_interpolate = interpolate_data(all_no_dummy.copy())
all_dummy_interpolate = interpolate_data(all_dummy.copy())
Difference between original and interpolated data.
(all_no_dummy != all_no_dummy_interpolate).sum()
Some exmaple:
2012-01-10 05:00:00 - there were outliers for temp (16.40) and atemp (20.46). Those values were interpolated to (12.77) and (15.26) respectively.
all_no_dummy.loc[('2012-01-10 02:00:00'):('2012-01-10 07:00:00')][continuous_var]
all_no_dummy_interpolate.ix[('2012-01-10 02:00:00'):('2012-01-10 07:00:00')][continuous_var]
From the boxplot below its clear most bike rentals are in hours: 8, 17, 18. We should expect that registred users rent bikes in those hours becouse there are almost no outliers. Between those hours there are hours with many outliers. We should expect that more casual users rent bikes in those hours
plt.figure(figsize=(10,6))
sns.boxplot(x="hour", y="count", data=all_no_dummy[all_no_dummy['train'] == 1])
plt.tight_layout
On the plots below we can see rentals distribution for specifc month. Druing summer there are most demand for bikes. During all season hour-rental characteristics have the same shape: for working days thera are two peaks on 8 and 17-18 hours. Casual users rent bike usually between those hours as well as during weekends.
# below code is taken from https://www.kaggle.com/viveksrinivasan/eda-ensemble-model-top-10-percentile
fig,(ax1,ax2,ax3,ax4)= plt.subplots(nrows=4, figsize=(10,20))
#fig.set_size_inches(12,15)
sortOrder = ["January","February","March","April","May","June","July","August","September","October","November","December"]
hueOrder = ["Monday","Tuesday","Wednesday","Thursday","Friday","Saturday","Sunday"]
monthAggregated = pd.DataFrame(all_no_dummy[all_no_dummy['train'] == 1].groupby("month_cat")["count"].mean()).reset_index()
monthSorted = monthAggregated.sort_values(by="count",ascending=False)
sns.barplot(data=monthSorted,x="month_cat",y="count",ax=ax1,order=sortOrder)
ax1.set(xlabel='Month', ylabel='Avearage Count',title="Average Count By Month")
hourAggregated = pd.DataFrame(all_no_dummy[all_no_dummy['train'] == 1].groupby(["hour","season_cat"],sort=True)["count"].mean()).reset_index()
sns.pointplot(x=hourAggregated["hour"], y=hourAggregated["count"],hue=hourAggregated["season_cat"], data=hourAggregated, join=True,ax=ax2)
ax2.set(xlabel='Hour Of The Day', ylabel='Users Count',title="Average Users Count By Hour Of The Day Across Season",label='big')
hourAggregated = pd.DataFrame(all_no_dummy[all_no_dummy['train'] == 1].groupby(["hour","dayofweek_cat"],sort=True)["count"].mean()).reset_index()
sns.pointplot(x=hourAggregated["hour"], y=hourAggregated["count"],hue=hourAggregated["dayofweek_cat"],hue_order=hueOrder, data=hourAggregated, join=True,ax=ax3)
ax3.set(xlabel='Hour Of The Day', ylabel='Users Count',title="Average Users Count By Hour Of The Day Across Weekdays",label='big')
hourTransformed = pd.melt(all_no_dummy[all_no_dummy['train'] == 1][["hour","casual","registered"]], id_vars=['hour'], value_vars=['casual', 'registered'])
hourAggregated = pd.DataFrame(hourTransformed.groupby(["hour","variable"],sort=True)["value"].mean()).reset_index()
sns.pointplot(x=hourAggregated["hour"], y=hourAggregated["value"],hue=hourAggregated["variable"],hue_order=["casual","registered"], data=hourAggregated, join=True,ax=ax4)
ax4.set(xlabel='Hour Of The Day', ylabel='Users Count',title="Average Users Count By Hour Of The Day Across User Type",label='big')
plt.subplots_adjust(hspace = 0.3)
plt.tight_layout
Let's find out what is distribution curves for dependent variables. Most of the current machine learning algorithmics performs best on normally distributed data. Pure distribution of depednent variables look more to be Poisson than Gaussian. Log transformation can helps a lot in converting data to normal-like distribution.
fig,axes= plt.subplots(nrows=3, ncols=2, figsize=(11,10))
sns.distplot(all_no_dummy[all_no_dummy['train'] == 1]['count'], ax=axes[0][0])
sns.distplot(all_no_dummy[all_no_dummy['train'] == 1]['count_log'], ax=axes[0][1])
sns.distplot(all_no_dummy[all_no_dummy['train'] == 1]['casual'], ax=axes[1][0])
sns.distplot(all_no_dummy[all_no_dummy['train'] == 1]['casual_log'], ax=axes[1][1])
sns.distplot(all_no_dummy[all_no_dummy['train'] == 1]['registered'], ax=axes[2][0])
sns.distplot(all_no_dummy[all_no_dummy['train'] == 1]['registered_log'], ax=axes[2][1])
plt.subplots_adjust(hspace = 0.3)
plt.tight_layout
There are not clear linear relationships between variables, besides dependent variables (casual, registerd, count) themselves and temp - atemp. It indicates that machine learning regression algorithms that can handle non-linearity could perfom better than linear regression.
sns.pairplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], vars=continuous_var + dependent_variables, hue='holiday', palette='coolwarm')
There is some correlation (showed on some next cells on heatmap) between casual and registred users rentals.
sns.pairplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], vars=['casual', 'registered'], palette='coolwarm')
It's clear linear relationships between temperature in Celsius and "feels like" temperature in Celsius'. We should drop one of them to avoid multicollinearity
sns.jointplot(x='temp',y='atemp',data=all_no_dummy_interpolate)
There are potentially 25 outliers where absolute difference between temperature in Celsius and "feels like" temperature in Celsius is greater than 5 Celsius degree. We can build another linear regression model to predict atemp for those outliers. But taking into account fact, there are only 24 records, we just simply set mean() on atemp_temp_diff variable for those records and we will drop atemp variable as well.
sns.jointplot(x='temp',y='atemp_temp_diff',data=all_no_dummy_interpolate)
all_no_dummy_interpolate[all_no_dummy_interpolate['atemp_temp_diff'] < -10][continuous_var + ['atemp_temp_diff']]
all_no_dummy_interpolate.loc[all_no_dummy_interpolate['atemp_temp_diff'] < -10 , \
['atemp','atemp_temp_diff']] = np.NaN
all_no_dummy_interpolate = all_no_dummy_interpolate.interpolate(method='time')
all_no_dummy_interpolate['atemp_temp_diff'] = all_no_dummy_interpolate['atemp'] - all_no_dummy_interpolate['temp']
Right now there are no outliers for atemp_temp_diff variable.
all_no_dummy_interpolate[all_no_dummy_interpolate['atemp_temp_diff'] < -10][continuous_var + ['atemp_temp_diff']]
TRAIN data set - categorical variables.
fig,axes= plt.subplots(nrows=4, ncols=2, figsize=(11,10))
sns.countplot(x='holiday',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[0][0])
sns.countplot(x='season_cat',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[0][1])
sns.countplot(x='workingday',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[1][0])
sns.countplot(x='weather_cat',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[1][1])
sns.countplot(x='dayofweek',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[2][0])
sns.countplot(x='month',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[2][1])
sns.countplot(x='hour',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[3][0])
sns.countplot(x='temp_cat',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1], ax=axes[3][1])
plt.subplots_adjust(hspace = 0.3)
plt.tight_layout
TEST data set - categorical variables.
fig,axes= plt.subplots(nrows=4, ncols=2, figsize=(11,10))
sns.countplot(x='holiday',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[0][0])
sns.countplot(x='season_cat',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[0][1])
sns.countplot(x='workingday',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[1][0])
sns.countplot(x='weather_cat',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[1][1])
sns.countplot(x='dayofweek',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[2][0])
sns.countplot(x='month',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[2][1])
sns.countplot(x='hour',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[3][0])
sns.countplot(x='temp_cat',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1], ax=axes[3][1])
plt.subplots_adjust(hspace = 0.3)
plt.tight_layout
Mainly difference between Train and Test caterogical variables distribution is a smaller amount of rows in February for test data set - it's due to fact in 2011 February had 28 days and in 2012 had 29 days.
TRAIN data set - continuous variables.
fig,axes= plt.subplots(nrows=2, ncols=2, figsize=(11,10))
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1]['temp'], bins=16, ax=axes[0][0])
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1]['atemp'], bins=16, ax=axes[0][1])
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1]['humidity'], bins=16, ax=axes[1][0])
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1]['windspeed'], bins=16, ax=axes[1][1])
plt.subplots_adjust(hspace = 0.3)
plt.tight_layout
fig,axes= plt.subplots(nrows=2, ncols=2, figsize=(11,10))
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1]['temp'], bins=16, ax=axes[0][0])
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1]['atemp'], bins=16, ax=axes[0][1])
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1]['humidity'], bins=16, ax=axes[1][0])
sns.distplot(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] != 1]['windspeed'], bins=16, ax=axes[1][1])
plt.subplots_adjust(hspace = 0.3)
plt.tight_layout
We can see that both data sets have very similar distribution for continuous variablers so overall there is no need to select specific subset from training data to train models on.
all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1].corr().ix[dependent_variables]
Heatmap is better visualisation tool to see some Pearson Correlation. There aren't strong correlations: hour and temp variable have approximately 0.4 Pearson Correlation value.
fig = plt.figure(figsize=(20,12))
sns.heatmap(all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1].corr(), annot=False,cmap='coolwarm')
plt.tight_layout
We can see that linear regression can give us only roughly approximation of total rentals from hour variable.
sns.jointplot(x='hour',y='count',kind='reg',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1])
Temp-Count scatter plot is skewed in right direction which indicates that higher temperature results in higher total bike rentals. But again, we can see it's only roughly approximation.
sns.jointplot(x='temp',y='count',kind='reg',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1])
sns.jointplot(x='temp',y='count',kind='hex',data=all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1])
Let's have a close look on ditribution across different years.
train_no_dummy_year_piv = all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1].pivot_table(values='count',index='month',columns='year').rename(index=monthDict)
train_no_dummy_year_piv
It looks like there are much more rentals in 2012 than in 2011. Perhaps there were more bikes to rent in 2012 than in 2011. Another reason could be fact that people got used to rent a bike from Capital BikeShare and there were more registred users in total.
fig = plt.figure(figsize=(7,5))
sns.heatmap(train_no_dummy_year_piv,cmap='coolwarm')
train_no_dummy_month_piv = all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1].pivot_table(values='count',index='hour',columns='month').rename(columns=monthDict)
We can see that from April to November there is a bike season.
fig = plt.figure(figsize=(11,8))
sns.heatmap(train_no_dummy_month_piv,cmap='coolwarm')
train_no_dummy_workingday_piv = all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1].pivot_table(values='count',index='hour',columns='workingday')
sns.clustermap(train_no_dummy_workingday_piv,cmap='coolwarm',standard_scale=1,method='weighted')
From the plot clustermap above we can devide hours into 8 categories:
We added those categories in variable hour_cat.
train_no_dummy_year_temp_piv = all_no_dummy_interpolate[all_no_dummy_interpolate['train'] == 1].round().pivot_table(values='count',index='temp',columns='workingday').dropna()
sns.clustermap(train_no_dummy_year_temp_piv,cmap='coolwarm',standard_scale=1,method='average')
From the plot clustermap above we can devide rounded temp into 10 categories:
We added those categories in variable temp_cat.
Here are three common evaluation metrics for regression problems:
Mean Absolute Error (MAE) is the mean of the absolute value of the errors:
$$\frac 1n\sum_{i=1}^n|y_i-\hat{y}_i|$$Mean Squared Error (MSE) is the mean of the squared errors:
$$\frac 1n\sum_{i=1}^n(y_i-\hat{y}_i)^2$$Root Mean Squared Error (RMSE) is the square root of the mean of the squared errors:
$$\sqrt{\frac 1n\sum_{i=1}^n(y_i-\hat{y}_i)^2}$$Root Mean Squared Log Error (RMSLE) is the square root of the mean of the squared log errors:
$$\sqrt{\frac 1n\sum_{i=1}^n(log(y_i+1)-log(\hat{y}_i+1))^2}$$Comparing these metrics:
All of these are loss functions, because we want to minimize them.
def evaluatorMAE_own(predictions, labelCol):
diff = np.abs(predictions[labelCol] - predictions['prediction'])
mean_error = diff.mean()
return mean_error
def evaluatorMSE_own(predictions, labelCol):
diff = predictions[labelCol] - predictions['prediction']
mean_error = np.square(diff).mean()
return mean_error
def evaluatorRMSE_own(predictions, labelCol):
diff = predictions[labelCol] - predictions['prediction']
mean_error = np.square(diff).mean()
return np.sqrt(mean_error)
def evaluatorRMSLE_own(predictions, labelCol):
diff = np.log(predictions[labelCol] + 1) - np.log(predictions['prediction'] + 1)
mean_error = np.square(diff).mean()
return np.sqrt(mean_error)
def evaluatorR2_own(predictions, labelCol):
SS_res = np.sum(np.square(predictions[labelCol] - predictions['prediction']))
SS_tot = np.sum(np.square(predictions[labelCol] - predictions[labelCol].mean()))
R2 = 1 - SS_res/SS_tot
return R2
def undo_log_transform(pred,labelCol, Kaggle=False):
predictions = pred.copy()
# Kaggle test data doesn't have labels
if (not Kaggle):
predictions[labelCol] = np.exp(predictions[labelCol]) - 1
predictions['prediction'] = np.exp(predictions['prediction']) - 1
return predictions
def evaluateMetrics(predictions, labelCol, rollback=True, rounded=True):
if (rollback):
predictions = undo_log_transform(predictions, labelCol)
if (rounded):
predictions = np.round(predictions)
print("MAE: " + str('%.4f' % evaluatorMAE_own(predictions, labelCol)))
print("MSE: " + str('%.4f' % evaluatorMSE_own(predictions, labelCol)))
print("RMSE: " + str('%.4f' % evaluatorRMSE_own(predictions, labelCol)))
print("R2: " + str('%.4f' % evaluatorR2_own(predictions, labelCol)))
print("----------------")
print("RMSLE: " + str('%.4f' % evaluatorRMSLE_own(predictions, labelCol)))
print("----------------")
return evaluatorRMSLE_own(predictions, labelCol)
Drop unnecessary added category variables from training dataset. Apache Spark needs all data to be numerical. We use 'day' variable taking into account that there will be different range of this variable in train and test set, but study showed that droping this variable didn't increase the results. We drop atemp and leave temp to avoid multicollinearity.
# drop_regularization = ['atemp','atemp_hour_change', 'atemp_day_change', \
# 'windspeed','windspeed_hour_change', 'windspeed_day_change']
#drop_regularization = ['holiday', 'atemp', 'windspeed', 'temp_day_change', 'atemp_hour_change', 'atemp_day_change', 'windspeed_hour_change', 'atemp_temp_diff']
drop_regularization = []
train_dummy_final = train_dummy.drop(added_cat_var + dependent_variables + drop_regularization, axis=1).copy()
train_no_dummy_final = train_no_dummy.drop(added_cat_var + dependent_variables + drop_regularization , axis=1).copy()
test_dummy_KAGGLE_final = test_dummy_KAGGLE.drop(added_cat_var + drop_regularization, axis=1).copy()
test_no_dummy_KAGGLE_final = test_no_dummy_KAGGLE.drop(added_cat_var + drop_regularization , axis=1).copy()
Split trainind data into train and "test" to evaluate our models. Final model will predict labels for test_KAGGLE datasets.
from sklearn.model_selection import train_test_split
(trainingData_dummy, testData_dummy) = train_test_split(train_dummy_final, test_size=0.2, random_state=101)
(trainingData_no_dummy, testData_no_dummy) = train_test_split(train_no_dummy_final, test_size=0.2, random_state=101)
Create Apache Spark Data Frames
spark_train_dummy = sqlContext.createDataFrame(trainingData_dummy)
spark_test_dummy = sqlContext.createDataFrame(testData_dummy)
spark_train_no_dummy = sqlContext.createDataFrame(trainingData_no_dummy)
spark_test_no_dummy = sqlContext.createDataFrame(testData_no_dummy)
Prepare Kaggle test data.
spark_Kaggle_test_dummy = sqlContext.createDataFrame(test_dummy_KAGGLE_final)
spark_Kaggle_test_no_dummy = sqlContext.createDataFrame(test_no_dummy_KAGGLE_final)
Apache Spark Machine Learning lib requires input as data frame transformed to labeled point.
from pyspark.ml.linalg import Vectors
DUMMY = True
dummy_cols = spark_train_dummy.columns
no_dummy_cols = spark_train_no_dummy.columns
kaggle_dummy_cols = spark_Kaggle_test_dummy.columns
kaggle_no_dummy_cols = spark_Kaggle_test_no_dummy.columns
def transformToLabeledPoint(row) :
retArray=[]
if (DUMMY):
spark_col = dummy_cols
else:
spark_col = no_dummy_cols
# remove dependent variables (before log transformation) from the middle of independent variables
spark_col = [x for x in spark_col if x not in dependent_variables]
# 3 last items are dependent variables
for col in spark_col[:-3]:
retArray.append(row[col])
label_count = row["count_log"]
label_registered = row["registered_log"]
label_casual = row["casual_log"]
return label_count, label_registered, label_casual, Vectors.dense(retArray)
def transformToLabeledPointKaggle(row) :
retArray=[]
if (DUMMY):
spark_col = kaggle_dummy_cols
else:
spark_col = kaggle_no_dummy_cols
# first column is datetime
for col in spark_col[1:]:
retArray.append(row[col])
datetime = row["datetime"]
# lables must match this from transformToLabeledPoint for further unionAll() operation
return datetime,0,0, Vectors.dense(retArray)
"""-------------------------------------------------------------------------------
Hypothesis Test Using Pearson’s Chi-squared Test Algorithm // Spark2 MLLib only
-------------------------------------------------------------------------------"""
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.stat import Statistics
from pyspark.mllib.linalg import Matrices, Vectors
DUMMY = False
def transformToLabeledPointMLLib(row) :
retArray=[]
if (DUMMY):
spark_col = dummy_cols
else:
spark_col = no_dummy_cols
# remove dependent variables (before log transformation) from the middle of independent variables
spark_col = [x for x in spark_col if x not in dependent_variables]
# 3 last items are dependent variables
for col in spark_col[:-3]:
retArray.append(row[col])
label_count = row["count_log"]
label_registered = row["registered_log"]
label_casual = row["casual_log"]
#For verification if p-value for label itsefl is zero:
#retArray.append(int(float(row["count_log"])))
retArray.append(row["count_log"])
lp = LabeledPoint(row["count_log"], retArray)
return lp
unionLpMLLib = spark_train_no_dummy.unionAll(spark_test_no_dummy).rdd.map(transformToLabeledPointMLLib)
featureTestResults = Statistics.chiSqTest(unionLpMLLib)
for i, result in enumerate(featureTestResults):
print("Column %d:\n%s" % (i + 1, result))
for i in range(0,len(featureTestResults)):
print(no_dummy_cols[i] + ": " + str(round(featureTestResults[i].pValue,4)))
no_dummy_cols
Preparation Apache Spark data frame for linear regression
DUMMY = True
trainingData_dummy = sqlContext.createDataFrame(spark_train_dummy.rdd.map(transformToLabeledPoint), ["label_count", "label_registered", "label_casual", "features"])
testData_dummy = sqlContext.createDataFrame(spark_test_dummy.rdd.map(transformToLabeledPoint), ["label_count", "label_registered", "label_casual", "features"])
testKaggle_dummy = sqlContext.createDataFrame(spark_Kaggle_test_dummy.rdd.map(transformToLabeledPointKaggle), ["label_count", "label_registered", "label_casual", "features"])
train_no_dummy.info()
Preparation Apache Spark data frame for random forest and gradient boosted regression. No need for dummy variables.
DUMMY = False
trainingData_no_dummy = sqlContext.createDataFrame(spark_train_no_dummy.rdd.map(transformToLabeledPoint), ["label_count", "label_registered", "label_casual", "features"])
testData_no_dummy = sqlContext.createDataFrame(spark_test_no_dummy.rdd.map(transformToLabeledPoint), ["label_count", "label_registered", "label_casual", "features"])
testKaggle_no_dummy = sqlContext.createDataFrame(spark_Kaggle_test_no_dummy.rdd.map(transformToLabeledPointKaggle), ["label_count", "label_registered", "label_casual","features"])
Cache spark data frames into memory for faster computation
trainingData_dummy.cache()
trainingData_dummy.count()
trainingData_no_dummy.cache()
trainingData_no_dummy.count()
testData_dummy.cache()
testData_dummy.count()
testData_no_dummy.cache()
testData_no_dummy.count()
testKaggle_dummy.cache()
testKaggle_dummy.count()
testKaggle_no_dummy.cache()
testKaggle_no_dummy.count()
Now its time to train and tune our model on training data using k-fold cross validation method!
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
As mentioned before, LinearRegression for good performance needs data with dummy variables.
def make_prediction(models_dict, dummy=False, Kaggle=False):
# Kaggle test data is used only with no_dummy
if (dummy):
if (Kaggle):
test_data = testKaggle_dummy
else:
test_data = testData_dummy
elif (Kaggle):
test_data = testKaggle_no_dummy
else:
test_data = testData_no_dummy
predictionsTestData_r = models_dict['label_registered'].transform(test_data)
predictionsTestData_c = models_dict['label_casual'].transform(test_data)
predictionsTestData_count = models_dict['label_count'].transform(test_data)
return predictionsTestData_r, predictionsTestData_c, predictionsTestData_count
Function for evaluation metrics, support both functionalities: rounded predictions and not.
def evaluate_prediction(predictionsTestData_r, predictionsTestData_c, predictionsTestData_count, rounded=False):
print("Evaluation prediction for registred users:")
pandas_pred_r = predictionsTestData_r.toPandas()
pandas_pred_r.loc[(pandas_pred_r['prediction'] < 0), 'prediction'] = 0
rmsle_r = evaluateMetrics(pandas_pred_r, 'label_registered', rounded=rounded)
print()
print("Evaluation prediction for casual users:")
pandas_pred_c = predictionsTestData_c.toPandas()
pandas_pred_c.loc[(pandas_pred_c['prediction'] < 0), 'prediction'] = 0
rmsle_c = evaluateMetrics(pandas_pred_c, 'label_casual', rounded=rounded)
print()
print("Evaluation prediction for sum of both models: registred + casual users:")
pandas_pred_c_undo_log = undo_log_transform(pandas_pred_c,'label_casual')
pandas_pred_c_undo_log.loc[(pandas_pred_c_undo_log['prediction'] < 0), 'prediction'] = 0
pandas_pred_r_undo_log = undo_log_transform(pandas_pred_r,'label_registered')
pandas_pred_r_undo_log.loc[(pandas_pred_r_undo_log['prediction'] < 0), 'prediction'] = 0
pandas_pred_sum = pd.DataFrame()
pandas_pred_sum['label_count'] = pandas_pred_c_undo_log['label_casual'] + pandas_pred_r_undo_log['label_registered']
pandas_pred_sum['prediction'] = pandas_pred_c_undo_log['prediction'] + pandas_pred_r_undo_log['prediction']
rmsle_sum = evaluateMetrics(pandas_pred_sum, 'label_count', rollback=False, rounded=rounded)
print()
print("Evaluation prediction for one count users model:")
pandas_pred_count = predictionsTestData_count.toPandas()
pandas_pred_count.loc[(pandas_pred_count['prediction'] < 0), 'prediction'] = 0
rmsle_count = evaluateMetrics(pandas_pred_count, 'label_count', rounded=rounded)
# We drop features variable in case to mix prediction from dummy and no_dummy models;
# dummy features vector has much more positions than no_dummy, so there would be problem to connect both.
# We just won't need features vector any more.
prediction_dict = {'registered' : undo_log_transform(pandas_pred_r.drop(['features'], axis = 1).copy(),'label_registered'),
'casual': undo_log_transform(pandas_pred_c.drop(['features'], axis = 1).copy(),'label_casual'),
'sum': pandas_pred_sum.copy(),
'count': undo_log_transform(pandas_pred_count.drop(['features'], axis = 1).copy(),'label_count')}
return prediction_dict
Function for mixing predictions from two different models together with respect to specifc ratio rate.
def evaluate_mixed_prediction(rf_pred_dict, bgtr_pred_dict, ratio=0.5, rounded=False):
print("Evaluation mixed (ratio=" + str(ratio) + ") prediction for registred users:")
mixed_pandas_r = ratio * rf_pred_dict['registered'] + (1.0 - ratio) * bgtr_pred_dict['registered']
rmsle_r = evaluateMetrics(mixed_pandas_r, 'label_registered', rollback=False, rounded=rounded)
print()
print("Evaluation mixed (ratio=" + str(ratio) + ") prediction for casual users:")
mixed_pandas_c = ratio * rf_pred_dict['casual'] + (1.0 - ratio) * bgtr_pred_dict['casual']
rmsle_c = evaluateMetrics(mixed_pandas_c, 'label_casual', rollback=False, rounded=rounded)
print()
print("Evaluation mixed (ratio=" + str(ratio) + ") prediction for sum of both models: registred + casual users:")
mixed_pandas_sum = ratio * rf_pred_dict['sum'] + (1.0 - ratio) * bgtr_pred_dict['sum']
rmsle_sum = evaluateMetrics(mixed_pandas_sum, 'label_count', rollback=False, rounded=rounded)
print()
print("Evaluation mixed (ratio=" + str(ratio) + ") prediction for one count users model:")
mixed_pandas_count = ratio * rf_pred_dict['count'] + (1.0 - ratio) * bgtr_pred_dict['count']
rmsle_count = evaluateMetrics(mixed_pandas_count, 'label_count', rollback=False, rounded=rounded)
mixed_prediction_dict = {'registered' : mixed_pandas_r.copy(),
'casual': mixed_pandas_c.copy(),
'sum': mixed_pandas_sum.copy(),
'count': mixed_pandas_count.copy(),
'rmsle_sum' : rmsle_sum}
return mixed_prediction_dict
Function for final prediction for Kaggle test dataset. Support mixing prediction with specific ratio. Final sum of predictions is rounded.
def predict_Kaggle_test(rf_predictionsTestData_r, rf_predictionsTestData_c, \
gbtr_predictionsTestData_r, gbtr_predictionsTestData_c, ratio=0.5):
# 1. Although Kaggle test dataset does not have labels, all prediction are Spark dataframes
# with schema ["label_count", "label_registered", "label_casual", "features"]
# for coherent format with training data which enables us to use Spark featureIndexer during traingin models
# featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(trainingData_no_dummy.unionAll(testData_no_dummy).unionAll(testKaggle_no_dummy))
#
# 2. We used column 'label_count' for convenient cache datatime variable used for submission file format
# pandas_pred_sum.rename(columns={"label_count": "datetime"})
#
# 3. Predicted values have to be numerical
# 4. labelCol is set to '' becouse Kaggle test dataset doesn't have one
rf_pandas_pred_r = undo_log_transform(rf_predictionsTestData_r.toPandas().drop(['features'], axis = 1), labelCol='', Kaggle=True)
rf_pandas_pred_c = undo_log_transform(rf_predictionsTestData_c.toPandas().drop(['features'], axis = 1), labelCol='', Kaggle=True)
gbtr_pandas_pred_r = undo_log_transform(gbtr_predictionsTestData_r.toPandas().drop(['features'], axis = 1), labelCol='', Kaggle=True)
gbtr_pandas_pred_c = undo_log_transform(gbtr_predictionsTestData_c.toPandas().drop(['features'], axis = 1), labelCol='', Kaggle=True)
rf_pandas_pred_r.loc[(rf_pandas_pred_r['prediction'] < 0), 'prediction'] = 0
rf_pandas_pred_c.loc[(rf_pandas_pred_c['prediction'] < 0), 'prediction'] = 0
gbtr_pandas_pred_r.loc[(gbtr_pandas_pred_r['prediction'] < 0), 'prediction'] = 0
gbtr_pandas_pred_c.loc[(gbtr_pandas_pred_c['prediction'] < 0), 'prediction'] = 0
pandas_pred_sum = rf_pandas_pred_r.copy()
pandas_pred_sum['count'] = 0.0
pandas_pred_sum['count'] = ratio * (rf_pandas_pred_r['prediction'] + rf_pandas_pred_c['prediction']) + \
(1.0 - ratio) * (gbtr_pandas_pred_r['prediction'] + gbtr_pandas_pred_c['prediction'])
pandas_pred_sum['count'] = np.round(pandas_pred_sum['count']).astype(int)
# other option to truncate floating point number instead of round
# pandas_pred_sum['count'] = pandas_pred_sum['count'].astype(int)
return pandas_pred_sum.rename(columns={"label_count": "datetime"})[['datetime','count']]
We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance. This will allow us to jointly choose parameters for all Pipeline stages. A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. We use a ParamGridBuilder to construct a grid of parameters to search over.
| regularizer $R(w)$ | gradient or sub-gradient | |
|---|---|---|
| zero (unregularized) | 0 | 0 |
| L2 | $\frac{1}{2}\|w\|^2$ | $w$ |
| L1 | $\|w\|$ | $\mathrm{sign}(w)$ |
| elastic net | $\alpha \|w\| + (1-\alpha)\frac{1}{2}\|w\|^2$ | $\alpha \mathrm{sign}(w) + (1-\alpha) w$ |
lr = LinearRegression(maxIter=1000)
lr_cv_models_dict = {}
regParam = [0.01, 0.001, 0.0001, 0.00001]
elasticNetParam = [0, 0.25, 0.5, 0.75, 1.0]
pipeline = Pipeline(stages=[lr])
for label in ['label_registered','label_casual', 'label_count']:
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, regParam) \
.addGrid(lr.elasticNetParam, elasticNetParam) \
.addGrid(lr.labelCol, [label]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(labelCol=label, metricName="rmse"),
numFolds=5) # (5x5)x5 = 125 models to check
start = time.time()
# Run cross-validation, and choose the best set of parameters on dummy dataset.
# trainingData_dummy + testData_dummy = whole Kaggle Train data, we separarated this sets to simply evaluate models
# we cross-validate on whole Kaggle train data to find best params to predict whole Kaggle Test set
cvModel = crossval.fit(trainingData_dummy.unionAll(testData_dummy))
print("======= CV for " + label + " =========")
end = time.time()
print("Time taken to train model with k-fold cv: " + str(end - start) + " seconds")
bestModel = cvModel.bestModel.stages[0]._java_obj
print("RegParam: " + str(bestModel.getRegParam()))
print("ElasticNetParam: " + str(bestModel.getElasticNetParam()))
lr_cv_models_dict[label] = cvModel
print()
Now that we have fit our model, let's evaluate its performance by predicting off the test values with the best values from cross validation!.
# lr = LinearRegression(regParam=0.00001, elasticNetParam=0.75, maxIter=1000)
# for label in ['label_registered','label_casual', 'label_count']:
# model = lr.setLabelCol(label).fit(trainingData_dummy)
# lr_cv_models_dict[label] = model
lr_predictionsTestData_r, lr_predictionsTestData_c, lr_predictionsTestData_count = make_prediction(lr_cv_models_dict, dummy=True)
Let's evaluate our model performance by calculating the evaluation metrics. Calculate the Mean Absolute Error, Mean Squared Error, Root Mean Squared Error, and the Root Mean Squared Log Error . Refer cell above for the formulas.
lr_pred_dict = evaluate_prediction(lr_predictionsTestData_r, lr_predictionsTestData_c, lr_predictionsTestData_count)
coefficients = pd.DataFrame(LinearRegression(regParam=0.00001, elasticNetParam=0.75, maxIter=100)\
.setLabelCol('label_count').fit(trainingData_dummy).coefficients.array,dummy_cols[:-3])
coefficients.columns = ['Coefficient']
Hour variables have the biggest coefficients values in linera regression equation. Amongs them, peak hours have the biggest one.
plt.figure(figsize=(12,20))
sns.heatmap(coefficients, cmap='coolwarm')
Let's quickly explore the residuals to make sure everything was okay with our data. It's good idea to plot a histogram of the residuals and make sure it looks normally distributed.
plt.scatter(lr_pred_dict['sum']['label_count'],lr_pred_dict['sum']['prediction'])
plt.xlabel('Y Test')
plt.ylabel('Predicted Y')
plt.title('Linear Regression')
Create a scatterplot of the real test values versus the predicted values.
sns.distplot((lr_pred_dict['sum']['label_count'] - lr_pred_dict['sum']['prediction']),bins=50);
plt.xlabel('Linear Regression Residuals')
Random forests are ensembles of decision trees. Random forests are one of the most successful machine learning models for classification and regression. They combine many decision trees in order to reduce the risk of overfitting. Like decision trees, random forests handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions.
Random forests train a set of decision trees separately, so the training can be done in parallel. The algorithm injects randomness into the training process so that each decision tree is a bit different. Combining the predictions from each tree reduces the variance of the predictions, improving the performance on test data.
The randomness injected into the training process includes:
Apart from these randomizations, decision tree training is done in the same way as for individual decision trees.To make a prediction on a new instance, a random forest must aggregate the predictions from its set of decision trees. This aggregation is done differently for classification and regression.For regression each tree predicts a real value. The label is predicted to be the average of the tree predictions.
# model = RandomForestRegressor(featuresCol="indexedFeatures")
# numTrees = [30, 50, 100]
# maxDepth = [15, 20, 25, 30]
# rf_cv_models_dict = {}
# # Automatically identify categorical features, and index them.
# # Set maxCategories so features with > maxCategories distinct values are treated as continuous.
# featureIndexer =\
# VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=31).fit(trainingData_no_dummy.unionAll(testData_no_dummy).unionAll(testKaggle_no_dummy))
# pipeline = Pipeline(stages=[featureIndexer, model])
# for label in ['label_registered','label_casual', 'label_count']:
# paramGrid = ParamGridBuilder() \
# .addGrid(model.numTrees, numTrees) \
# .addGrid(model.maxDepth, maxDepth) \
# .addGrid(model.labelCol, [label]) \
# .build()
# crossval = CrossValidator(estimator=pipeline,
# estimatorParamMaps=paramGrid,
# evaluator=RegressionEvaluator(labelCol=label, metricName="rmse"),
# numFolds=5) # (5x5)x5 = 125 models to check
# start = time.time()
# cvModel = crossval.fit(trainingData_no_dummy.unionAll(testData_no_dummy))
# print("======= CV for " + label + " =========")
# end = time.time()
# print("Time taken to train model with k-fold cv: " + str(end - start) + " seconds")
# bestModel = cvModel.bestModel.stages[0]._java_obj
# print("NumTrees: " + str(bestModel.getNumTrees()))
# print("MaxDepth: " + str(bestModel.getMaxDepth()))
# rf_cv_models_dict[label] = cvModel
# print()
Create models with params from k-fold cross validation
# featureIndexer est has to be fitted to all data (train and test) in order to work properly
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(trainingData_no_dummy.unionAll(testData_no_dummy).unionAll(testKaggle_no_dummy))
#rfr = RandomForestRegressor(numTrees=1000, maxDepth=15, maxBins=32, featuresCol="indexedFeatures")
rfr = RandomForestRegressor(numTrees=50, maxDepth=30)
rf_cv_models_dict = {}
for label in ['label_registered','label_casual', 'label_count']:
pipeline = Pipeline(stages=[featureIndexer, rfr.setLabelCol(label)])
model = pipeline.fit(trainingData_no_dummy)
rf_cv_models_dict[label] = model
rf_predictionsTestData_r, rf_predictionsTestData_c, rf_predictionsTestData_count = make_prediction(rf_cv_models_dict, dummy=False)
rf_pred_dict = evaluate_prediction(rf_predictionsTestData_r, rf_predictionsTestData_c, rf_predictionsTestData_count)
plt.scatter(rf_pred_dict['sum']['label_count'],rf_pred_dict['sum']['prediction'])
plt.xlabel('Y Test')
plt.ylabel('Predicted Y')
plt.title('Random Forest Regression')
sns.distplot((rf_pred_dict['sum']['label_count'] - rf_pred_dict['sum']['prediction']),bins=50);
plt.xlabel('Random Forest Regression Residuals')
Gradient-Boosted Trees (GBTs) are ensembles of decision trees. GBTs iteratively train decision trees in order to minimize a loss function. Like decision trees, GBTs handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions.
Both Gradient-Boosted Trees (GBTs) and Random Forests are algorithms for learning ensembles of trees, but the training processes are different. There are several practical trade-offs:
In short, both algorithms can be effective, and the choice should be based on the particular dataset.
# model = GBTRegressor(maxMemoryInMB=1024, maxBins=128, maxIter=150, cacheNodeIds=True, checkpointInterval=200, featuresCol="indexedFeatures")
# minInstancesPerNode = [7,8,9,10,11,12]
# maxDepth = [4,5,6,7]
# gbtr_cv_models_dict = {}
# # Automatically identify categorical features, and index them.
# # Set maxCategories so features with > maxCategories distinct values are treated as continuous.
# featureIndexer =\
# VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=31).fit(trainingData_no_dummy.unionAll(testData_no_dummy))
# pipeline = Pipeline(stages=[featureIndexer, model])
# for label in ['label_registered','label_casual', 'label_count']:
# paramGrid = ParamGridBuilder() \
# .addGrid(model.minInstancesPerNode, minInstancesPerNode) \
# .addGrid(model.maxDepth, maxDepth) \
# .addGrid(model.labelCol, [label]) \
# .build()
# crossval = CrossValidator(estimator=pipeline,
# estimatorParamMaps=paramGrid,
# evaluator=RegressionEvaluator(labelCol=label, metricName="rmse"),
# numFolds=4) # (5x5)x5 = 125 models to check
# start = time.time()
# cvModel = crossval.fit(trainingData_no_dummy.unionAll(testData_no_dummy))
# print("======= CV for " + label + " =========")
# end = time.time()
# print("Time taken to train model with k-fold cv: " + str(end - start) + " seconds")
# bestModel = cvModel.bestModel.stages[0]._java_obj
# print("MinInstancesPerNode: " + str(bestModel.getMinInstancesPerNode()))
# print("MaxDepth: " + str(bestModel.getMaxDepth()))
# gbtr_cv_models_dict[label] = cvModel
# print()
# featureIndexer est has to be fitted to all data (train and test) in order to work properly
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24).fit(trainingData_no_dummy.unionAll(testData_no_dummy).unionAll(testKaggle_no_dummy))
#gbt = GBTRegressor(maxDepth=5, minInstancesPerNode=10, maxMemoryInMB=512, maxBins=32, maxIter=150, subsamplingRate=0.7, featuresCol="indexedFeatures")
gbt = GBTRegressor(maxDepth=5, minInstancesPerNode=10, maxMemoryInMB=512, maxBins=128, maxIter=150)
gbtr_cv_models_dict = {}
for label in ['label_registered','label_casual', 'label_count']:
pipeline = Pipeline(stages=[featureIndexer, gbt.setLabelCol(label)])
model = pipeline.fit(trainingData_no_dummy)
gbtr_cv_models_dict[label] = model
gbtr_predictionsTestData_r, gbtr_predictionsTestData_c, gbtr_predictionsTestData_count = make_prediction(gbtr_cv_models_dict, dummy=False)
gbtr_pred_dict = evaluate_prediction(gbtr_predictionsTestData_r, gbtr_predictionsTestData_c, gbtr_predictionsTestData_count)
plt.scatter(gbtr_pred_dict['sum']['label_count'],gbtr_pred_dict['sum']['prediction'])
plt.xlabel('Y Test')
plt.ylabel('Predicted Y')
plt.title('Gradient Boosted Tree Regression')
sns.distplot((gbtr_pred_dict['sum']['label_count'] - gbtr_pred_dict['sum']['prediction']),bins=50);
plt.xlabel('Gradient Boosted Tree Regression Residuals')
We trained three machine learning algorithms. We used K-fold cross validation for parameter tuning. Of course we couldn't check all posibly combination due to computancy limitation. Especially Gradient Boosted Regression was hard to train becouse of its iterative manner. On the other hand it predicts very fast - this is one of the reason of its popularity (ex. for page rank cases). As we might suspect, non-linearity models gave us better results. Gradient Boosted Regression won the game with the score 0.3107 !
RMSLE |
||||
|---|---|---|---|---|
| registered | casual | registered + casual | count | |
| Linear Regression | 0.5539 |
0.5867 | 0.5333 |
0.5472 |
| Random Forest Regression | 0.3445 |
0.4889 | 0.3409 |
0.3614 |
| Gradient Boosted Regression | 0.3117 |
0.5089 | 0.3107 |
0.3264 |
Training two models to predict casual and registred rentals accordingly gave us slightly better results. In all models prediction for casual users were significant worse than for registered users. It follows that there is no strict rule for renting cars for casual users / there is some other variable, not present in our dataset, that could better model casual renting.
RMSLE can be used when we don’t want to penalize huge differences when both the values are huge numbers, on the other hand it's harder to compare those results to others. That's why we also measured $R^2$ metric which in a way is standariezed.
$R^2$ |
||||
|---|---|---|---|---|
| registered | casual | registered + casual | count | |
| Linear Regression | 0.8041 |
0.7262 | 0.8085 |
0.8019 |
| Random Forest Regression | 0.9143 |
0.8923 | 0.9204 |
0.9109 |
| Gradient Boosted Regression | 0.9373 |
0.8941 | 0.9427 |
0.9293 |
Depends on $R^2$ value we assume that compatibility is:
It's worth to notice that combinig two models results in better accuracy than each of them had separately.
RMSLE |
||||
|---|---|---|---|---|
| registered | casual | registered + casual | count | |
| Random Forest Regression | 0.3117 |
0.5089 |
0.3107 |
0.3264 |
So it's good idea to combine prediction from two best machine learning algorithms: Random Forest Regression and Gradient Boosted Regression
We need to plot curve ratio to RMSLE to verify the split proportion
rmsle_list = []
ratio_array = np.linspace(0.05,1,20)
for ratio in ratio_array:
rmsle_list.append(evaluate_mixed_prediction(rf_pred_dict, gbtr_pred_dict, ratio=float(ratio))['rmsle_sum'])
We can see that split proportion around 0.3 gives us best result.
plt.plot(ratio_array, rmsle_list)
plt.xlabel('Ratio')
plt.ylabel('RMSLE')
plt.title('Ratio * RandomForest + (1 - Ratio) * GradientBoostedTree')
'%.3f' % min(rmsle_list)
Get the best ratio split.
#best_ratio = float(ratio_array[rmsle_list.index(min(rmsle_list))])
best_ratio = float(0.3)
# featureIndexer has to be fitted to all data (train and test) in order to work properly
featureIndexer =\
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=24)\
.fit(trainingData_no_dummy.unionAll(testData_no_dummy).unionAll(testKaggle_no_dummy))
Previous models were trained on 0.8 part of whole train data set. Now we need to train on whole dataset with chosen params.
# model params taken from previous cv
#rfr = RandomForestRegressor(numTrees=1000, maxDepth=15, maxBins=32, featuresCol="indexedFeatures")
rfr = RandomForestRegressor(numTrees=100, maxDepth=20)
rf_cv_models_dict_final = {}
for label in ['label_registered','label_casual', 'label_count']:
pipeline = Pipeline(stages=[featureIndexer, rfr.setLabelCol(label)])
model = pipeline.fit(trainingData_no_dummy.unionAll(testData_no_dummy))
rf_cv_models_dict_final[label] = model
# model params taken from previous cv
#gbt = GBTRegressor(maxDepth=5, minInstancesPerNode=10, maxMemoryInMB=512, maxBins=32, maxIter=150, subsamplingRate=0.7, featuresCol="indexedFeatures")
gbt = GBTRegressor(maxDepth=5, minInstancesPerNode=10, maxMemoryInMB=512, maxBins=128, maxIter=150)
gbtr_cv_models_dict_final = {}
for label in ['label_registered','label_casual', 'label_count']:
pipeline = Pipeline(stages=[featureIndexer, gbt.setLabelCol(label)])
model = pipeline.fit(trainingData_no_dummy.unionAll(testData_no_dummy))
gbtr_cv_models_dict_final[label] = model
gbtr_predictionsTestData_r, gbtr_predictionsTestData_c, gbtr_predictionsTestData_count = make_prediction(gbtr_cv_models_dict_final, dummy=False, Kaggle=True)
rf_predictionsTestData_r, rf_predictionsTestData_c, rf_predictionsTestData_count = make_prediction(rf_cv_models_dict_final, dummy=False, Kaggle=True)
Get 'raw' prediction on Kaggle test dataset from best two models: Random Forest Regression and Gradient Boosted Regression Tree.
kaggleSubmission = predict_Kaggle_test(rf_predictionsTestData_r, rf_predictionsTestData_c, \
gbtr_predictionsTestData_r, gbtr_predictionsTestData_c, ratio = best_ratio)
kaggleSubmission.head(10)
kaggleSubmission.info()
kaggleSubmission.to_csv('kaggleSubmission.csv', index=False)
Due to computation and time limitations there are ares where additional research could be done: